package storm.cookbook;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;

/**
 * 将各个组件组织起来，声明Storm Topology。
 */
public class HelloWorldTopology {

    public static void main(String[] args) {
        try {
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("randomHelloWorld", new HelloWorldSpout(), 10);
            // 第三个参数为paralism_hint
            builder.setBolt("HelloWorldBolt", new HelloWorldBolt(), 2)
                    .shuffleGrouping("randomHelloWorld");

            Config conf = new Config();
            conf.setDebug(true);

            if (args != null && args.length > 0) {
                conf.setNumWorkers(3);

                // args[0] 为 topology name
                StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
            } else {
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("test", conf, builder.createTopology());

                Utils.sleep(10000);
                cluster.killTopology("test");
                cluster.shutdown();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}
